package com.flink.examples.demo;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
 * @Author JL
 * @Date 2021/01/15
 * @Version V1.0
 */
public class SelectToKafka {

    /**
     官方参考：https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
     开始偏移位置
     config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是：
         group-offsets：从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。
         earliest-offset：从最早的偏移量开始。
         latest-offset：从最新的偏移量开始。
         timestamp：从每个分区的用户提供的时间戳开始。
         specific-offsets：从每个分区的用户提供的特定偏移量开始。
     默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费

     一致性保证
     sink.semantic选项来选择三种不同的操作模式：
         NONE：Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
         AT_LEAST_ONCE （默认设置）：这样可以确保不会丢失任何记录（尽管它们可以重复）。
         EXACTLY_ONCE：Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时，请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level（read_committed 或read_uncommitted-后者是默认值）。
     */
    static String table_sql = "CREATE TABLE order_source (\n" +
            "  `orderId` STRING,\n" +
            "  `userName` STRING,\n" +
            "  `goodsType` STRING,\n" +
            "  `orderTime` STRING,\n" +
            "  `orderTimeSeries` BIGINT,\n" +
            "  `totalPrice` DECIMAL(10, 2),\n" +
            "  `ts` TIMESTAMP(3)\n" +
            ") WITH (\n" +
            "  'connector' = 'kafka',\n" +
            "  'topic' = 'order_behavior',\n" +
            "  'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
            "  'properties.group.id' = 'testGroup',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "  'format' = 'json'\n" +
            ")";

    public static void main(String[] args) throws Exception {
        //构建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //默认流时间方式
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        //注册kafka数据维表
        tEnv.executeSql(table_sql);

        String sql = "select orderId,userName,goodsType,orderTime,orderTimeSeries,totalPrice from order_source";
        Table table = tEnv.sqlQuery(sql);
        //打印字段结构
        table.printSchema();

        //table 转成 dataStream 流
        DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
        behaviorStream.print();

        env.execute();
    }

}
